package org.example;

/**
 * Hello world!
 */

import org.example.consumer.KafkaConsumerBuilder;
import org.example.consumer.KafkaConsumerWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.annotation.EnableAsync;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


@SpringBootApplication
@EnableAsync
public class Application implements CommandLineRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    private static final String CORE_SIZE = "thread-pool.core-size";
    private static final String MAX_SIZE = "thread-pool.max-size";
    private static final String WORKER_SIZE = "thread-pool.worker-size";
    @Autowired
    private KafkaConsumerBuilder kafkaConsumerBuilder;
    @Autowired
    private Environment env;

    public static void main(String[] args) {
        // 加密口令密钥
        SpringApplication.run(Application.class, args);
    }

    /**
     * spring-cli启动
     *
     * @param args 启动参数
     */
    @Override
    public void run(String... args) {
        // 启动线程池, worker和core基本保持一致
        int corePoolSize = Integer.parseInt(env.getProperty(CORE_SIZE, "2"));
        int maxPoolSize = Integer.parseInt(env.getProperty(MAX_SIZE, "8"));
        int workerSize = Integer.parseInt(env.getProperty(WORKER_SIZE, "2"));
        ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.DiscardOldestPolicy());
        // 启动worker
        List<KafkaConsumerWorker> consumers = new ArrayList<>();
        for (int i = 0; i < workerSize; i++) {
            KafkaConsumerWorker kcw = new KafkaConsumerWorker(kafkaConsumerBuilder.create());
            consumers.add(kcw);
            pool.submit(kcw);
        }
        // 注册shutdownHook, 主线程关闭时清理关闭consumer和线程池
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            for (KafkaConsumerWorker kcw : consumers) {
                kcw.shutdown();
            }
            try {
                pool.shutdown();
                pool.awaitTermination(5000, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOGGER.error("shutdown consumer pool failed.", e);
            }
        }));
    }
}
